Published on

Design Discuss and Re-invent a go routine Container

Authors
  • avatar
    Name
    Mohan Liu
    Twitter

Abstract

In the preceding discussion, we explored the complexities associated with managing go routines. This time, we aim to address these challenges. A fundamental aspect of software engineering is the concept of abstraction. Within our scope, our goal is to devise a go routine manager that seamlessly handles the intricacies, allowing developers to concentrate on the core tasks at hand.

Languages like Java provide an executorService or a thread pool to manage concurrency. However, before we consider adopting a third-party pool service from GitHub, it's critical to analyze the underlying principles of a thread pool. We'll examine everything from the compromises involved in designing its API to the nitty-gritty of its actual implementation, ensuring the conversation remains both technical in nature and enlightening for educational purposes.

Objectives

1. Task Flexibility

the worker pool should be capable of handling a diverse array of computational patterns, ranging from complex tree/graph traversal algorithms to executing map-reduce operations efficiently.

2. Controlled Concurrency

meticulously managing the lifecycle of goroutines to prevent race conditions, deadlocks, and the exhaustion of system resources. This involves orchestrating goroutines in a manner that optimizes performance while adhering to a pre-defined concurrency model.

3. Hierarchical Task Delegation

allowing a goroutine to responsibly spawn sub-tasks, all the while maintaining strict adherence to the overarching concurrency constraints set forth by our worker pool design

4. Additional thoughts

  1. Monitoring and Logging.
    Consider if and how the system provides insight into its performance and operation. Does it offer logging, real-time monitoring, or analytics?

  2. Integration with Existing Systems.
    How easily can this worker pool be integrated with existing systems? Does it provide interfaces or hooks for common frameworks or libraries?

  3. Customization and Extensibility.
    Is the worker pool customizable? Can users extend its functionality, add plugins, or modify its behavior?

  4. Security Considerations.
    Are there any security features or compliance with security standards?

  5. Performance Metrics.
    Are there any performance benchmarks or goals the worker pool aims to meet?

5. Goroutine Management Encapsulation and Orchestration

  1. Goroutine Number Specification.
    Users can specify the exact number of goroutines, allowing for a customized balance between concurrency and resource use.

  2. Dynamic Resource Scaling.
    The pool can dynamically add or reduce goroutines in rea- time, adapting to the current load without service interruption, features like perforanace metrics,and backpressue managment would be good to have.

  3. Context Integration.
    Utilization of context as a cancellation and timeout signal ensures that goroutines can be stopped gracefully, aiding in the control ofthe resource lifecycle.

  4. Resource Exhaustion Mitigation.
    The design anticipates and mitigates resource depletion, with mechanisms to handle spikes in demand and ensure system stability.

  5. Graceful Shutdown.
    On termination, the worker pool guarantees the proper reclamation of resources, with each goroutine completing its current task to prevent data loss or corruption.

  6. Task Result and Error Handling.
    Tasks are designed to return results or errors in a consistent manner, allowing for straightforward integration with the rest of the application.

  7. OS Signal Handling.
    The worker pool listens for OS signals, ensuring that it reacts appropriately to external commands like graceful shutdown or emergency termination.

API Design and trade-off discussion

Before delving into the complexities of managing routines, it's important to consider the various components that will be integral to our design, as well as their interactions.

Overview

The enhanced structure of our Go routine container encompasses several intricately connected components that work in unison to streamline concurrent task management

  • GoRoutineContainer: Serves as the primary orchestrator within our system. It utilizes a TaskQueue to schedule the execution of tasks and a ResultPipeline to handle the aggregation of results. Its key responsibilities include adding tasks to the queue and dispatching them to available Workers.

  • TaskQueue: Acts as a holding area for tasks awaiting processing. It is capable of enqueuing tasks and checking if the queue is empty, ensuring a steady supply of work for the Workers.

  • ResultPipeline: A dedicated channel for accumulating the results after task executions. It provides the functionality to add new results, facilitating the collection and subsequent processing of output data.

  • Task: The fundamental unit of work within our system. Each task encompasses an Execute method, which, upon invocation, yields a Result. This design encapsulates the work logic and outcome within a single entity.

  • Worker: These are the workhorses of the system, each linked to a Task. Workers are responsible for processing tasks—executing the contained logic—and collecting results, which are then reported back to the ResultPipeline.

  • Result: Represents the output of a Task. It includes a correlation ID to trace the result back to its originating task, ensuring that each piece of work can be accurately tracked and correlated with its outcome.

The flow within the system is as follows

  • Task Allocation: The GoRoutineContainer places tasks into the TaskQueue.

  • Task Processing: Workers retrieve tasks from the TaskQueue, execute them, and generate results.

  • Result Aggregation: Workers submit the produced Results to the ResultPipeline.

  • Result Utilization: The ResultPipeline holds the results, which can then be used for further processing, analysis, or reporting.

Entities

1. Task

  1. Flexibility considerations

    1. Variadic Arguments

    2. Results and Errors Handling.
      To handle multiple return values and errors, you could define a Result type that includes a slice of interface for results, and an error type for error handling. After the task is run, the results and any errors are stored in a Result object, which can then be queued in the Result Queue.

    3. Runnable/Executable Interface.
      Taking inspiration from Java's Runnable interface, which has a single run() method, ensures simplicity. In Go, for example, this could be an interface with a Run() method. This method should encapsulate the task's logic and be able to be executed by a worker without additional context.

    4. Self-Contained Logic.

    5. Generics (Type Parameters).
      If the language supports generics (like Java), they can be used to create a more type-safe way of passing arguments and receiving results, while still allowing for flexibility.

    6. Context Passing.
      For tasks that might need cancellation or timeout, a context could be passed to the Run() method. This allows the caller to control task execution externally, such as cancelling a task that is taking too long to complete.

    7. Callbacks or Events

  2. Implementation Considerations

    • Option 1 Object-Oriented Approach (Interface with A Run Method)

      type Task interface {
          Perform() error
      }
      
      type SomeTask struct {
          SomeData string
      }
      
      func (st *SomeTask) Perform() error {
          //...
          return nil
      }
      
      type TaskExecutor struct{}
      
      func (te *TaskExecutor) Execute(t Task) error {
          return t.Perform()
      }
      
    • Option 2 Functional Approach (Closure)

      type TaskFunc func() error
       
       func someTask(someVariable string) TaskFunc {
           return func() error {
               // Read the file
               // ...
               return nil
           }
       }
       
       func ExecuteTask(task TaskFunc) error {
           return task()
       }
      
ConsiderationObject-Oriented (OOP)Functional (FP with closures)
ExtendabilityUtilizes classes and interfaces to seamlessly facilitate the expansion of code. Ideal for anticipated future development.--
Development ComplexityOffers a structured framework that simplifies developing large or intricate systems through clear separation of concerns.--
Expression--Employs concise functions leading to elegant, expressive solutions for straightforward tasks.
Cognitive Load--May introduce a higher cognitive burden due to complex scopes and challenges in modular design as the application scales.
Task CompositionEnables the creation of new instances by passing the result of one task to another, fostering maintainability.Task functions are created from the results of other tasks, which can lead to less readable nested functions and increased mental mapping in multi-step sequences.

2. Workers

  1. How to control the number of go routines

    • Option1 using Semaphore token

      var sem = make(chan struct{}, maxGoroutines)
      
      for task := range tasks {
          sem <- struct{}{} // Acquire a token
          go func(task Task) {
              defer func() { <-sem }() // Release the token
              // ... do the task
          }(task)
      }
      
    • Option2 Fixed number of go routines, apply the pub-sub pattern

      tasks := make(chan Task, numberOfTasks)
      for i := 0; i < maxGoroutines; i++ {
          go worker(tasks)
      }
      
      // worker is a function that processes tasks from the channel
      func worker(tasks <-chan Task) {
          for task := range tasks {
              // ... do the task
          }
      }
      
FeatureSemaphore ApproachWorker Pool Approach
ScalingChanging the concurrency level is non-trivialRelatively easy to add more workers
Memory UsagePotentially higher with many short-lived goroutinesGenerally lower and more predictable
OverheadGoroutine creation/destruction overhead for each taskNo overhead from spinning up/down goroutines for each task
ComplexitySimple to implement, harder to adjust dynamicallyMore complex to implement, easier to adjust dynamically
Task Execution OrderNo inherent prioritization, FIFO if channel is usedNo inherent prioritization, FIFO if channel is used
Dynamic AdjustabilityNot straightforward to change channel capacity at runtimeCan simply spin up new workers at runtime if needed

in short, even though worker pool approach sounds a bit more complex, it provides in general good extendibility. So, Worker Pool Approach is chosen.

3. Result

When designing a result object within the context of a worker pool, it's crucial to consider how the container will manage and utilize the outcomes of the tasks it processes. The design of the result object can be significantly influenced by the nature of the tasks and the importance of their individual results.

For heterogeneous tasks, where each task's result is of distinct importance, the result object may take on characteristics similar to a promise-like construct found in many asynchronous programming environments. This type of result object would typically allow for encapsulation of each task's outcome, enabling the following features:

  • Asynchronous Resolution: The result object could provide a mechanism to asynchronously retrieve the outcome of a task once it's available, much like how a promise resolves with a value.

  • Error Handling: It could include methods for catching and handling errors that may occur during task execution.

  • Chaining: It might also support chaining operations, where a subsequent task could be queued or executed depending on the result of the current one.

On the other hand, for bulk operations, such as Conway's Game of Life or tasks amenable to a map-reduce paradigm, individual task results may be less significant compared to the collective progress or final aggregated result. In such scenarios, the design of the result object might prioritize different aspects:

  • Progress Tracking: The result object could focus on tracking the cumulative progress of all tasks, possibly through counters or progress events.

  • Aggregation: It might provide methods to combine or reduce results from individual tasks into a coherent whole.

  • Notification: The result object could be designed to notify the container when all tasks have reached a certain milestone or when the entire operation is complete.

Overall, the design of the result object should align with the broader goals and concerns of the worker pool's container, ensuring that it provides the necessary functionality to manage task outcomes effectively, whether they're handled individually or in aggregate.

Considerations

MechanismHeterogeneous Tasks SuitabilityBulk Operations Suitability
CallbackSuitable for individual notifications, but can become complex with diverse outcomes.Less suitable due to challenges in result aggregation and callback management.
ChannelEffective for task result communication, especially when results are significant and diverse.Ideal for concurrent result processing and aggregation, aligns well with Go's concurrency model.
Future/PromiseHighly suitable for encapsulating asynchronous results, with built-in error handling and result chaining.Possible with additional logic for result collection and aggregation; Mono aligns well for single eventual outcomes.
Shared VariableLess recommended due to synchronization complexity for diverse results.Effective when used with synchronization mechanisms for result aggregation.
MonoWell-suited for tasks requiring a single or no result, with reactive handling of results and errors.Can represent the final aggregated result, but might require a collective operation like Mono.zip or Mono.when for multiple Monos.
FluxLess common as it's geared towards data streams rather than discrete task results.Particularly well-suited for managing data streams and allows for sophisticated operations like filtering, reduction, and transformation in reactive programming.

Channel is chosen because it is effective in handling both heterogeneous tasks and Bulk operations.

 type TaskStatus string

 const (
     TaskSuccess    TaskStatus = "SUCCESS"
     TaskFailed     TaskStatus = "FAILED"
     TaskInProgress TaskStatus = "IN_PROGRESS"
 )
 
 type TaskResult struct {
     TaskID      string      // Unique identifier for the task
     Result      interface{} // The actual result of the task, can be any type
     Status      TaskStatus  // Optional for Promise, The status of the task
     Error       error       // Error information, if applicable
     CreatedAt   time.Time   // Timestamp when the task was created
     CompletedAt time.Time   // Timestamp when the task was completed
 }
 
 var chan TaskResult

4. Tasks Queue

Key considerations:

  1. Thread Safety / Concurrency Safety.

    • This is about ensuring that the queue can handle concurrent access without data races or corruption.
  • Scalability:

    • Queue Length: The length of the queue must be managed. An unbounded queue can lead to high memory usage and potential system instability. Conversely, a bounded queue must have a strategy for when it's full (e.g., blocking, dropping tasks, or pushing back on the producer).
  • Task Priority: Good to have, but the complexity grows.

    • Implementation: using multiple queues/priority queue for different priorities is a common approach. Workers would check the higher-priority queue first. The problem is low-priority tasks could starve due to constant inflow of higher-priority tasks. Strategies like aging (increasing the priority of tasks over time) can mitigate this.
  • Monitoring and Logging:

    • Queue Metrics: Tracking metrics such as queue size over time, task wait times, and worker idle times can help in understanding system performance and bottlenecks.

    • Logging: Detailed logs can aid in debugging issues and understanding system behavior. However, logging should not significantly impact performance.

    • Health Checks: Regular health checks can monitor the status of the queue and workers to ensure they are functioning correctly and efficiently.

    • Alerting: Implementing an alert system that triggers notifications when certain thresholds are breached (e.g., queue size exceeds a limit) can help prevent system overloads.

  • Load Balancing:

    • Task Distribution: You might consider strategies for distributing tasks to workers in a way that balances the load, especially if tasks have varying complexity.

    • Worker Performance: Keeping track of worker performance metrics could allow for smarter load balancing, sending more tasks to faster workers.

  • Graceful Degradation:

    • Failure Handling: Decide how the system should degrade in the face of individual task failures. Should the task be retried, moved to a dead letter queue, or should there be a rollback mechanism?
Implementation Proposals
  • Mutex-Protected Queue

    type Task struct {
    	// Task fields and methods
    }
    
    type MutexQueue struct {
    	tasks []Task
    	lock  sync.Mutex
    }
    
    func (q *MutexQueue) Enqueue(task Task) {
    	q.lock.Lock()
    	defer q.lock.Unlock()
    	q.tasks = append(q.tasks, task)
    }
    
    func (q *MutexQueue) Dequeue() *Task {
    	...
    }
    
  • Atomic Operations

    
    type Task struct {
    	// Task fields and methods
    }
    
    type node struct {
    	value Task
    	next  *node
    }
    
    type AtomicQueue struct {
    	head *node
    	tail *node
    }
    
    func NewAtomicQueue() *AtomicQueue {
    	dummy := &node{} // Dummy node
    	return &AtomicQueue{
    		head: dummy,
    		tail: dummy,
    	}
    }
    
    func (q *AtomicQueue) Enqueue(task Task) {
    	newNode := &node{value: task}
    	for {
    		tail := loadNode(&q.tail)
    		next := loadNode(&tail.next)
    
    		if tail == loadNode(&q.tail) {
    			if next == nil {
    				if casNode(&tail.next, nil, newNode) {
    					casNode(&q.tail, tail, newNode)
    					return
    				}
    			} else {
    				casNode(&q.tail, tail, next)
    			}
    		}
    	}
    }
    
    func loadNode(ptr **node) *node {
    	return (*node)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(ptr))))
    }
    
    func casNode(ptr **node, old, new *node) bool {
    	return atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(ptr)), unsafe.Pointer(old), unsafe.Pointer(new))
    }
    
    
  • Lock-Free data structure (not covered as out of the scope )

  • Concurrent Collections

    class Task {
    }
    
    private ConcurrentLinkedQueue<Task> queue = new ConcurrentLinkedQueue<>();
    
    public void addTask(Task task) {
        queue.offer(task);
    }
    
    public void processTasks() {
        Task task;
        while ((task = queue.poll()) != null) {
            task.execute();
        }
    }
    
  • Channel-Based Queues

    type Task struct {
    	// Task fields and methods.
    }
    
    type ChannelQueue struct {
    	taskChan chan Task
    }
    
    func NewChannelQueue(bufferSize int) *ChannelQueue {
    	return &ChannelQueue{
    		taskChan: make(chan Task, bufferSize),
    	}
    }
    
    func (q *ChannelQueue) Enqueue(task Task) {
    	q.taskChan <- task // Blocks when the buffer is full
    }
    
    func (q *ChannelQueue) Dequeue() (Task, bool) {
    	task, ok := <-q.taskChan // Blocks when the buffer is empty
    	return task, ok
    }
    
    func (q *ChannelQueue) Close() {
    	close(q.taskChan)
    }
    
Comparisons
Implementation ProposalThread Safety: Pros and ConsMonitoring and Logging: Ease and EffectivenessScalability: Queue Length and Full Queue HandlingExtensibility: Priority Management
Mutex-Protected QueuePros: Simple to implement; easy to understand.
Cons: Mutex contention can be a bottleneck; locking/unlocking overhead.
Ease: Relatively easy to add logging around lock operations.
Effectiveness: Effective for tracking queue access and contention issues.
Queue Length: Easy to manage with condition variables.
Full Queue: Can implement blocking or dropping tasks when full.
Extensibility: Can extend to support priorities by using multiple queues or a priority queue data structure.
Atomic OperationsPros: Better under high contention; avoids locking overhead.
Cons: Complex to implement; susceptible to [[ABA problem]]
Ease: Harder due to the low-level nature of operations.
Effectiveness: Can be less intuitive to correlate atomic operations with higher-level actions.
Queue Length: More challenging to manage without locks.
Full Queue: Requires additional logic to handle full queue scenarios, which can be complex.
Extensibility: More difficult to extend for priorities due to complexity. May require intricate algorithms to handle priority ordering atomically.
Lock-Free QueuesPros: No locks involved, can be faster for certain workloads.
Cons: Very complex to implement; difficult to debug.
Ease: Difficult due to the complexity of the lock-free algorithms.
Effectiveness: Offers detailed insights if implemented correctly, but can be very challenging.
Queue Length: Management can be very complex due to the lock-free design.
Full Queue: Handling a full queue is difficult and often requires fallback to locking mechanisms.
Extensibility: Extending with priority management is highly challenging due to the complexity of ensuring lock-free consistency with priority ordering.
Concurrent CollectionsPros: Provided by many standard libraries; high-level abstraction.
Cons: Potentially less performant due to generality; may not fit all use cases.
Ease: Easier as many libraries come with built-in monitoring capabilities.
Effectiveness: Generally effective and aligned with the abstraction level of the collection.
Queue Length: Often have built-in management.
Full Queue: Typically have well-defined behavior for full queues, such as blocking or rejection policies.
Extensibility: Many concurrent collections already support priority management or can be easily extended by using priority queue implementations provided by the language or libraries.
Channel-Based QueuesPros: Native to some languages (e.g., Go channels); built-in safety.
Cons: Can be language-specific; varying performance characteristics.
Ease: Moderate, depending on language support for introspection.
Effectiveness: Can be very effective, especially if the language/runtime provides good tools for channel monitoring.
Queue Length: Can be defined at channel creation.
Full Queue: Channels block by default when full, but can be designed for non-blocking behavior or dropping messages.
Extensibility: Not inherently designed for priority management. Implementing priority would require additional structures or a separate priority channel system.

Conclusion and Code Snippet

After going over the details and looking at different options, I've put together a solid worker pool implementation for you.

In my next blog post, we'll dive into how to use this worker pool to tackle some common coding problems, including how to handle a map-reduce situation. Additionally, we will introduce enhanced visualization of the worker pool's progress using Prometheus and Grafana.

Keep an eye out – we're about to get into some really practical stuff that could make your coding life a whole lot easier.

package workerpool

import (
	"context"
	"errors"
	"fmt"
	"net/http"
	"sync"
	"sync/atomic"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)

// TaskFunc is the type of function that can be submitted to the worker pool.
// It returns a result and an error. You would replace interface{} with whatever
// result type your tasks are supposed to return.
type TaskFunc func() (interface{}, error)

// type TaskFuncWithId func(int64 id) (interface{}, error)
type TaskFuncWithId struct {
	Task   TaskFunc
	TaskId int64
}

type SubmitResult struct {
	TaskId int64
	Result interface{}
	Error  error
}

type Logger interface {
	Log(message string)
}

type defaultLogger struct{}

func (l *defaultLogger) Log(message string) {
	fmt.Println(message)
}

type WorkerPoolConfig struct {
	MaxWorkers    int           // Maximum number of worker goroutines
	Timeout       time.Duration // Maximum time to wait for task completion
	TaskQueueSize int           // Use with Caution
	Logger        Logger
}

type WorkerPool struct {
	isStopped       int32 // atomic flag
	config          WorkerPoolConfig
	ctx             context.Context
	cancel          context.CancelFunc
	wg              sync.WaitGroup
	publishers      chan TaskFuncWithId
	workerStopChans []chan bool
	taskId          atomic.Int64
	ResultChan      chan SubmitResult
	logger          Logger
	stats           map[string]prometheus.Metric
	server          *http.Server
}

func NewWorkerPool(workerPoolConfig WorkerPoolConfig) *WorkerPool {

	ctx, cancel := context.WithCancel(context.Background())

	pool := &WorkerPool{
		config:          workerPoolConfig,
		ctx:             ctx,
		cancel:          cancel,
		publishers:      make(chan TaskFuncWithId, workerPoolConfig.TaskQueueSize),
		workerStopChans: make([]chan bool, workerPoolConfig.MaxWorkers),
		taskId:          atomic.Int64{},
		ResultChan:      make(chan SubmitResult, workerPoolConfig.TaskQueueSize),
	}

	if workerPoolConfig.Logger != nil {
		pool.logger = workerPoolConfig.Logger
	} else {
		pool.logger = &defaultLogger{}
	}

	psRunningWorkers := prometheus.NewCounter(prometheus.CounterOpts{
		Name: "worker_pool_running_workers",
		Help: "The total number of running worker",
	})

	psTotalSubmittedTasks := prometheus.NewCounter(prometheus.CounterOpts{
		Name: "worker_pool_total_submitted_tasks",
		Help: "The total number of tasks submitted to the worker pool",
	})

	psTotalExecutedTasks := prometheus.NewCounter(prometheus.CounterOpts{
		Name: "worker_pool_total_executed_tasks",
		Help: "The total number of tasks executed by the worker pool",
	})

	psTasksQueueSize := prometheus.NewGauge(prometheus.GaugeOpts{
		Name: "worker_pool_tasks_queue_size",
		Help: "The size of the tasks queue",
	})

	// Register it with the default registry
	prometheus.MustRegister(psRunningWorkers)
	prometheus.MustRegister(psTotalSubmittedTasks)
	prometheus.MustRegister(psTotalExecutedTasks)
	prometheus.MustRegister(psTasksQueueSize)

	pool.stats = make(map[string]prometheus.Metric)
	pool.stats["psTotalSubmittedTasks"] = psTotalSubmittedTasks
	pool.stats["psTotalExecutedTasks"] = psTotalExecutedTasks
	pool.stats["psRunningWorkers"] = psRunningWorkers
	pool.stats["psTasksQueueSize"] = psTasksQueueSize

	for i := 0; i < workerPoolConfig.MaxWorkers; i++ {
		stopChan := make(chan bool)
		pool.workerStopChans[i] = stopChan
		pool.wg.Add(1)
		psRunningWorkers.Inc()
		go pool.worker(i+1, stopChan)
	}

	go pool.startPrometheus()
	return pool
}

func (wp *WorkerPool) startPrometheus() {
	// Create a ServeMux and register the Prometheus handler
	mux := http.NewServeMux()
	mux.Handle("/metrics", promhttp.Handler())

	// Create the server with the custom ServeMux
	wp.server = &http.Server{
		Addr:    ":8080",
		Handler: mux,
	}

	// Run the server in a goroutine so that it doesn't block
	go func() {
		if err := wp.server.ListenAndServe(); err != http.ErrServerClosed {
			// Handle error
			panic(err)
		}
	}()

	// Set up a channel to listen for OS signals for graceful shutdown
	// stop := make(chan os.Signal, 1)
	// signal.Notify(stop, os.Interrupt, syscall.SIGTERM)

	// Block until a signal is received
	<-wp.ctx.Done()
	wp.logger.Log("prometheus server stopped")
	// Create a context with a timeout for the shutdown
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Gracefully shutdown the server, waiting for any in-flight requests to complete
	if err := wp.server.Shutdown(ctx); err != nil {
		// Handle error
		panic(err)
	}

	wp.logger.Log("prometheus server stopped")
}

// worker is a method on the WorkerPool that processes tasks from the taskQueue.
func (wp *WorkerPool) worker(id int, stopChan chan bool) {
	defer wp.wg.Done()
	// TODO: do I need this??
	// defer wp.stats["psRunningWorkers"].Dec()
	defer wp.logger.Log(fmt.Sprintf("worker %d stopped\n", id))

	for {
		select {
		case <-wp.ctx.Done(): // Check if context was cancelled (pool is stopping)
			return
		case <-time.After(wp.config.Timeout):
			wp.logger.Log(fmt.Sprintf("worker %d timed out\n", id))
			return
		case <-stopChan: // Check if this specific worker was told to stop
			return
		case task, ok := <-wp.publishers: // Wait for a task
			if !ok {
				// The publishers channel was closed, no more tasks will come
				return
			}

			gauge, ok := wp.stats["psTasksQueueSize"].(prometheus.Gauge)
			if ok {
				gauge.Dec()
			}

			if task.Task != nil {
				wp.logger.Log(fmt.Sprintf("worker %d is working on task %d\n", id, task.TaskId))
				result, err := task.Task()

				if err != nil {
					wp.logger.Log(fmt.Sprintf("worker %d error on task %d: %v\n", id, task.TaskId, err))
				}
				counter, ok := wp.stats["psTotalExecutedTasks"].(prometheus.Counter)
				if ok {
					counter.Inc()
				}
				if result == nil {
					continue
				}

				taskResult := SubmitResult{
					TaskId: task.TaskId,
					Result: result,
					Error:  err,
				}

			loop:
				for {
					select {
					case wp.ResultChan <- taskResult:
						// Task sent successfully
						break loop
					default:
						// Channel is full, handle the case when the channel is full
						wp.logger.Log(fmt.Sprintf("worker %d stuck on sending task %d result, resultChan is full, cannot send result\n", id, task.TaskId))
						// TODO: instead of panic what to do??
						panic("resultChan is full, cannot send result")
					}
				}
			}
		}
	}
}

func (wp *WorkerPool) Submit(task TaskFunc) (int64, <-chan SubmitResult, error) {
	// Create a buffered channel for the result.
	if atomic.LoadInt32(&wp.isStopped) == 1 {
		return 0, nil, errors.New("worker pool is not accepting new tasks")
	}
	taskId := wp.taskId.Add(1)
	counter, ok := wp.stats["psTotalSubmittedTasks"].(prometheus.Counter)
	if ok {
		counter.Inc()
	}
	TaskFuncWithId := TaskFuncWithId{
		Task:   task,
		TaskId: taskId,
	}

loop:
	for {
		select {
		case wp.publishers <- TaskFuncWithId:
			// Task sent successfully
			gauge, ok := wp.stats["psTasksQueueSize"].(prometheus.Gauge)
			if ok {
				gauge.Inc()
			}
			break loop
		default:
			// Channel is full, handle the case when the channel is full
			wp.logger.Log("publishers Channel is full, cannot send task")
		}
	}
	wp.logger.Log(fmt.Sprintf("worker pool submitted task %d\n", taskId))
	return taskId, wp.ResultChan, nil
}

func (wp *WorkerPool) WaitAll() {
	wp.wg.Wait()
	wp.logger.Log("all tasks completed")
}

func (wp *WorkerPool) Stop() {
	atomic.StoreInt32(&wp.isStopped, 1)
	// First, stop all workers by cancelling the context.
	wp.cancel()

	// Wait for all workers to finish.
	wp.wg.Wait()

	for _, stopChan := range wp.workerStopChans {
		close(stopChan)
	}

	// Close the publishers channel to signal no more tasks will be sent.
	// This is safe only after we have ensured all workers have stopped.
	close(wp.publishers)
	close(wp.ResultChan)

	//TODO: Drain the resultChan.
	// Optionally, you can also drain the resultChan here if needed,
	// and possibly close it if no more results will be processed.
	// Be aware that closing a channel while it is still being written to
	// by other goroutines will cause a panic.
}


//https://github.com/MohanL/workerpool